package cn.doitedu.tech.validate

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.MapState
import org.apache.flink.api.common.state.MapStateDescriptor

class Rule01Caculator implements IRuleCaculator {

    MapState<Integer,Integer> state;

    @Override
    void open(MapState<Integer,Integer> mapState,HashMap<Integer,Integer> initValues) {

        this.state = mapState;
        def entrySet = initValues.entrySet()
        for (final def entry in entrySet) {
            state.put(entry.getKey(),entry.getValue())
        }
    }

    @Override
    void tagCaculate(EventBean eventBean) {

        if(eventBean.getEventId().equals("A") ){
            Integer oldValue = state.get(eventBean.getUserId())
            state.put(eventBean.getUserId(),oldValue==null?1:oldValue+1)
        }

    }

    @Override
    String tagQuery() {
        def iterator = state.iterator()

        def tmp = new HashMap<Integer, Integer>();
        while(iterator.hasNext()){
            def entry = iterator.next()
            tmp.put(entry.getKey(),entry.getValue())
        }

        return tmp.toString();
    }
}
